package org.budo.warehouse.logic.consumer;

import java.util.List;

import javax.annotation.Resource;

import org.budo.support.dao.page.Page;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.IEventFilterLogic;
import org.budo.warehouse.logic.bean.DynamicBeanService;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.api.IFieldMappingService;
import org.budo.warehouse.service.api.IPipelineService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.FieldMapping;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
@Component("dispatcherDataConsumer")
public class DispatcherDataConsumer implements DataConsumer {
    @Resource
    private IPipelineService pipelineService;

    @Resource
    private IDataNodeService dataNodeService;

    @Resource
    private DynamicBeanService dynamicBeanService;

    @Resource
    private IEventFilterLogic eventFilterLogic;

    @Resource
    private IFieldMappingService fieldMappingService;

    @Override
    public void consume(DataEntry dataEntry) {
        Integer dataNodeId = dataEntry.getSourceDataNodeId();
        List<Pipeline> pipelines = pipelineService.listBySourceDataNodeCached(dataNodeId, Page.max());

        for (Pipeline pipeline : pipelines) {
            if (!eventFilterLogic.filter(pipeline, dataEntry)) {
                continue; // 事件过滤
            }

            List<FieldMapping> fieldMappings = fieldMappingService.listByPipelineIdCached(pipeline.getId()); // 字段对应关系
            FieldMappingDataEntry fieldMappingDataEntry = new FieldMappingDataEntry(dataEntry, fieldMappings, pipeline);
            this.pipelineHandleEntry(fieldMappingDataEntry, pipeline);
        }
    }

    private void pipelineHandleEntry(DataEntry dataEntry, Pipeline pipeline) {
        DataNode targetDataNode = dataNodeService.findByIdCached(pipeline.getTargetDataNodeId());
        DataConsumer dataConsumer = dynamicBeanService.dataConsumer(targetDataNode, pipeline);

        try {
            dataConsumer.consume(dataEntry);
        } catch (Throwable e) {
            log.error("#69 data consume error, dataConsumer=" + dataConsumer + ", dataEntry=" + dataEntry + ", e=" + e, e);
        }
    }
}